import sxp
import XendDB
import EventServer; eserver = EventServer.instance()
-
+from XendError import XendError
+
"""The port for the migrate/save daemon xfrd."""
XFRD_PORT = 8002
def clientConnectionFailed(self, connector, reason):
print 'clientConnectionFailed>', 'connector=', connector, 'reason=', reason
+ self.xinfo.error(reason)
class XfrdInfo:
"""Abstract class for info about a session with xfrd.
Has subclasses for save and migrate.
"""
+ """Suspend timeout (seconds).
+ We set a timeout because suspending a domain can hang."""
+ timeout = 30
+
def __init__(self):
from xen.xend import XendDomain
self.xd = XendDomain.instance()
self.paused = {}
def vmconfig(self):
- print 'vmconfig>'
dominfo = self.xd.domain_get(self.src_dom)
- print 'vmconfig>', type(dominfo), dominfo
if dominfo:
val = sxp.to_string(dominfo.sxpr())
else:
val = None
- print 'vmconfig<', 'val=', type(val), val
return val
def error(self, err):
+ print 'Error>', err
self.state = 'error'
if not self.deferred.called:
+ print 'Error> calling errback'
self.deferred.errback(err)
def dispatch(self, xfrd, val):
def cberr(err):
v = ['xfr.err', errno.EINVAL]
sxp.show(v, out=xfrd.transport)
+ self.error(err)
op = sxp.name(val)
op = op.replace('.', '_')
if not err: return
self.error(err);
xfrd.loseConnection()
- #try:
- # self.xd.domain_unpause(self.src_dom)
- #except:
- # print >>sys.stdout, "Error unpausing domain:", self.src_dom
return None
def xfr_progress(self, xfrd, val):
def xfr_vm_suspend(self, xfrd, val):
"""Suspend a domain. Suspending takes time, so we return
a Deferred that is called when the suspend completes.
+ Suspending can hang, so we set a timeout and fail if it
+ takes too long.
"""
print 'xfr_vm_suspend>', val
try:
d = defer.Deferred()
# Subscribe to 'suspended' events so we can tell when the
# suspend completes. Subscribe to 'died' events so we can tell if
- # the domain died.
+ # the domain died. Set a timeout and error handler so the subscriptions
+ # will be cleaned up if suspending hangs or there is an error.
def onSuspended(e, v):
- print 'onSuspended>', e, v
+ print 'xfr_vm_suspend>onSuspended>', e, v
if v[1] != vmid: return
subscribe(on=0)
d.callback(v)
def onDied(e, v):
- print 'onDied>', e, v
+ print 'xfr_vm_suspend>onDied>', e, v
if v[1] != vmid: return
- subscribe(on=0)
d.errback(XendError('Domain died'))
def subscribe(on=1):
action('xend.domain.suspended', onSuspended)
action('xend.domain.died', onDied)
+ def cberr(err):
+ print 'xfr_vm_suspend>cberr>', err
+ subscribe(on=0)
+ return err
+
subscribe()
val = self.xd.domain_shutdown(vmid, reason='suspend')
self.suspended[vmid] = 1
+ d.addErrback(cberr)
+ d.setTimeout(self.timeout)
return d
except:
val = errno.EINVAL
eserver.inject('xend.migrate.ok', self.sxpr())
else:
self.state = 'error'
+ self.error(XendError("save failed"))
eserver.inject('xend.migrate.error', self.sxpr())
class XendSaveInfo(XfrdInfo):
eserver.inject('xend.save.ok', self.sxpr())
else:
self.state = 'error'
+ self.error(XendError("save failed"))
eserver.inject('xend.save.error', self.sxpr())
#include <getopt.h>
#include <errno.h>
#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
#include <time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "file_stream.h"
#include "string_stream.h"
#include "lzi_stream.h"
+#include "gzip_stream.h"
#include "sys_net.h"
#include "sys_string.h"
#include "select.h"
#define MODULE_NAME "XFRD"
-#define DEBUG 1
+#define DEBUG 0
#include "debug.h"
/*
int stringof(Sxpr exp, char **s){
int err = 0;
- dprintf(">\n");
- objprint(iostdout, exp, PRINT_TYPE); IOStream_print(iostdout, "\n");
+ //dprintf(">\n"); objprint(iostdout, exp, PRINT_TYPE); IOStream_print(iostdout, "\n");
if(ATOMP(exp)){
*s = atom_name(exp);
} else if(STRINGP(exp)){
err = -EINVAL;
*s = NULL;
}
- dprintf("< err=%d s=%s\n", err, *s);
+ //dprintf("< err=%d s=%s\n", err, *s);
return err;
}
int err = 0;
char *s;
unsigned long l;
- dprintf(">\n");
- objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
+ //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
if(INTP(exp)){
*v = OBJ_INT(exp);
} else {
*v = (int)l;
}
exit:
- dprintf("< err=%d v=%d\n", err, *v);
+ //dprintf("< err=%d v=%d\n", err, *v);
return err;
}
char *h;
unsigned long a;
int err = 0;
- dprintf(">\n");
- objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
+ //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
err = stringof(exp, &h);
if(err) goto exit;
if(get_host_address(h, &a)){
}
*v = a;
exit:
- dprintf("< err=%d v=%x\n", err, *v);
+ //dprintf("< err=%d v=%x\n", err, *v);
return err;
}
int portof(Sxpr exp, uint16_t *v){
char *s;
int err = 0;
- dprintf(">\n");
- objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
+ //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
if(INTP(exp)){
*v = get_ul(exp);
*v = htons(*v);
*v = p;
}
exit:
- dprintf("< err=%d v=%u\n", err, *v);
+ //dprintf("< err=%d v=%u\n", err, *v);
return err;
}
err = Conn_sxpr(conn, &sxpr);
if(err) goto exit;
if(!sxpr_elementp(sxpr, oxfr_hello)){
- dprintf("> sxpr_elementp test failed\n");
+ wprintf("> sxpr_elementp test failed\n");
err = -EINVAL;
goto exit;
}
XFR_PROTO_MINOR);
if(err < 0) goto exit;
IOStream_flush(conn->out);
- dprintf("> xfr_response...\n");
err = xfr_response(conn);
exit:
dprintf("< err=%d\n", err);
dprintf("> Xfr xfr_addr=%s:%d\n", inet_ntoa(xfr_addr), ntohs(xfr_port));
err = Conn_connect(peer, flags, xfr_addr, xfr_port);
if(err) goto exit;
- printf("\n");
XfrState_set_state(state, XFR_HELLO);
// Send hello message.
err = xfr_send_hello(peer);
int plain_bytes = lzi_stream_plain_bytes(zio);
int comp_bytes = lzi_stream_comp_bytes(zio);
float ratio = lzi_stream_ratio(zio);
- dprintf("> Compression: plain %d bytes, compressed %d bytes, ratio %3.2f\n",
+ iprintf("> Compression: plain %d bytes, compressed %d bytes, ratio %3.2f\n",
plain_bytes, comp_bytes, ratio);
}
- printf("\n");
exit:
dprintf("> err=%d\n", err);
if(err && !XfrState_get_err(state)){
Conn_close(peer);
if(!err){
t1 = time(NULL) - t0;
- dprintf("> Transfer complete in %lu seconds\n", t1);
+ iprintf("> Transfer complete in %lu seconds\n", t1);
}
dprintf("> done err=%d, notifying xend...\n", err);
xfr_send_done(state, xend);
*/
int xfr_save(Args *args, XfrState *state, Conn *xend, char *file){
int err = 0;
+ int flags = (O_CREAT | O_EXCL | O_WRONLY);
+ int mode = 0644;
+ int fd;
IOStream *io = NULL;
dprintf("> file=%s\n", file);
- io = file_stream_fopen(file, "wb");
- if(!io){
- dprintf("> Failed to open %s\n", file);
+ fd = open(file, flags, mode);
+ if(fd < 0) {
+ eprintf("> Failed to open %s\n", file);
err = -EIO;
goto exit;
}
+ io = gzip_stream_fdopen(fd, "wb1");
+ if(!io){
+ eprintf("> Failed to allocate gzip state for %s\n", file);
+ err = -ENOMEM;
+ goto exit;
+ }
err = xen_domain_snd(xend, io, state->vmid, state->vmconfig, state->vmconfig_n);
if(err){
err = xfr_error(xend, err);
IOStream_close(io);
IOStream_free(io);
}
+ if(err){
+ unlink(file);
+ }
dprintf("< err=%d\n", err);
return err;
}
exit:
if(!err){
t1 = time(NULL) - t0;
- dprintf("> Transfer complete in %lu seconds\n", t1);
+ iprintf("> Transfer complete in %lu seconds\n", t1);
}
if(err){
xfr_error(peer, err);
dprintf(">\n");
err = Conn_init(conn, flags, peersock, peer_in);
if(err) goto exit;
- dprintf(">xfr_hello... \n");
+ //dprintf(">xfr_hello... \n");
err = xfr_hello(conn);
if(err) goto exit;
- dprintf("> sxpr...\n");
+ //dprintf("> sxpr...\n");
err = Conn_sxpr(conn, &sxpr);
if(err) goto exit;
- dprintf("> sxpr=\n");
- objprint(iostdout, sxpr, PRINT_TYPE); IOStream_print(iostdout, "\n");
+ //dprintf("> sxpr=\n");
+ //objprint(iostdout, sxpr, PRINT_TYPE); IOStream_print(iostdout, "\n");
if(sxpr_elementp(sxpr, oxfr_migrate)){
// Migrate message from xend.
uint32_t addr;
} else{
// Anything else is invalid.
err = -EINVAL;
- dprintf("> Invalid message: ");
+ eprintf("> Invalid message: ");
objprint(iostderr, sxpr, 0);
IOStream_print(iostderr, "\n");
xfr_error(conn, err);
int key = 0;
int long_index = 0;
+ dprintf(">\n");
set_defaults(args);
while(1){
key = getopt_long(argc, argv, short_opts, long_opts, &long_index);